package com.zenitera.bigdata.tableapiflinksql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink02_flinksql_KafkaToKafka {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv
                .executeSql("create table source_sensor (id string, ts bigint, vc int) with("
                        + "'connector' = 'kafka',"
                        + "'topic' = 'topic_source_sensor',"
                        + "'properties.bootstrap.servers' = 'hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092',"
                        + "'properties.group.id' = 'wangting',"
                        + "'scan.startup.mode' = 'latest-offset',"
                        + "'format' = 'json'"
                        + ")");
        tableEnv
                .executeSql("create table sink_sensor(id string, ts bigint, vc int) with("
                        + "'connector' = 'kafka',"
                        + "'topic' = 'topic_sink_sensor',"
                        + "'properties.bootstrap.servers' = 'hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092',"
                        + "'format' = 'json'"
                        + ")");

        tableEnv
                .executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");

    }
}

/*
[wangting@hdt-dmcp-ops01 bin]$ producer topic_source_sensor
>{"id": "sensor_1", "ts": 1000, "vc": 10}
>{"id": "sensor_2", "ts": 2000, "vc": 20}
>{"id": "sensor_1", "ts": 3000, "vc": 30}
>{"id": "sensor_2", "ts": 4000, "vc": 40}
>{"id": "sensor_1", "ts": 5000, "vc": 50}
>{"id": "sensor_2", "ts": 6000, "vc": 60}
>
---------------------------------------------------
[wangting@hdt-dmcp-ops02 ~]$ consumer topic_sink_sensor
{"id":"sensor_1","ts":1000,"vc":10}
{"id":"sensor_1","ts":3000,"vc":30}
{"id":"sensor_1","ts":5000,"vc":50}

 */
